<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
<!--
    Copyright 2013 James McClure

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
-->
<!--
Design by Free CSS Templates
http://www.freecsstemplates.org
Released for free under a Creative Commons Attribution 3.0 License

Name       : Singular
Description: A two-column, fixed-width design with a neutral color scheme.
Version    : 1.0
Released   : 20121119
-->
<html xmlns="http://www.w3.org/1999/xhtml">
	<head>
		<meta http-equiv="content-type" content="text/html; charset=utf-8" />
		<meta name="description" content="A Java MQTT client, Mock MQTT Broker, HTTP to MQTT gateway, and MQTT clustering proxy" />
		<meta name="keywords" content="" />
		<title>Xenqtt - A Simple and Innovative Tookit for MQTT Integration</title>
		<link rel="shortcut icon" href="images/favicon.ico">
		<link href="resources/prettify.css" type="text/css" rel="stylesheet" />
		<script type="text/javascript" src="resources/prettify.js"></script>
		<link rel="stylesheet" type="text/css" href="style.css" />
		<script>
		  (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
		  (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
		  m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
		  })(window,document,'script','//www.google-analytics.com/analytics.js','ga');
		
		  ga('create', 'UA-44597320-1', 'sourceforge.net');
		  ga('send', 'pageview');
		
		</script>
	</head>
	<body onload="prettyPrint()">
		<div id="wrapper">
			<div id="header">
				<img src="images/happy-minion.png" />
				<div id="logo">
					<h1><a href="index.html">XenQTT</a></h1>
				</div>
				<div id="menu">
					<ul>
						<li class="first"><a href="index.html">Home</a></li>
						<li><a href="features.html">Features</a></li>
						<li><a href="documentation.html">Documentation</a></li>
						<li><a href="http://sourceforge.net/projects/xenqtt/files">Downloads</a></li>
						<li><a href="http://sourceforge.net/p/xenqtt/tickets">Tickets</a></li>
						<li class="last"><a href="about.html">About</a></li>
					</ul>
					<br class="clearfix" />
				</div>
			</div>
			<div id="page">
				<div id="content">
					<div class="post">
						<h2>XenQTT Documentation</h2>
						<ul class="list">
						<li class="first"><a href="apidocs/">API Docs (JavaDoc)</a></li>
						<li><a href="#apps">Running Applications</a></li>
						<li><a href="#maven">Maven Integration</a></li>
						<li><a href="#clientapi">Using the Client API</a></li>
						<li><a href="#mockbrokerapi">Using the Mock Broker API</a></li>
						<li class="last"><a href="#links">Related Resources</a></li>
						</ul>
					</div>
					<div class="post" id="apps">
						<h3>Running Applications</h3>
						<p>
						XenQTT is released as a single jar. To run any XenQTT application you execute the jar:
						<pre class="prettyprint code">
java -jar xenqtt-version.jar
						</pre>
						Where <code>version</code> is the XenQTT version number which is part of the JAR file name.
						The command above will give you usage details. For much more detailed info execute this:
						<pre class="prettyprint code">
java -jar xenqtt-version.jar help
						</pre>
						</p>
					</div>
					<div class="post">
						<h3 id="maven">Maven Integration</h3>
						<p>
						XenQTT is released to the central maven repository. Add this dependency element with the latest version number to your <code>pom.xml</code>:
						<pre class="prettyprint code">
&lt;dependency&gt;
    &lt;groupId&gt;net.sf.xenqtt&lt;/groupId&gt;
    &lt;artifactId&gt;xenqtt&lt;/artifactId&gt;
    &lt;version&gt;...&lt;/version&gt;
&lt;/dependency&gt;	</pre>
						</p>
					</div>
					<div class="post" id="clientapi">
						<h3>Using the Client API</h3>
						<strong>Overview</strong>
						<p>
						XenQTT provides two distinct MQTT clients for interfacing with MQTT brokers. One is a synchronous client where all operations to the
						broker (connect, publish, subscribe, etc.) are blocking. The other is an asynchronous client where operations to the broker are
						non-blocking. Interaction with broker events in the asynchronous client are handled via callbacks in a specific interface that the
						user must implement.
						</p>
						<p>
						The following sections show how to use both the synchronous and asynchronous clients. You are taken through a subscriber and publisher
						model where the subscriber attempts to build up a musical catalog of classic rock with a paired publisher that is only too happy to
						provide.
						</p>
						<p>All examples are included in the xenqtt jar in the <code>net.sf.xenqtt.examples</code> package.</p>
						<strong>Synchronous Client Example</strong>
						<p class="code-header">Synchronous Subscriber</p>
						<pre class="prettyprint code">
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import net.sf.xenqtt.client.MqttClient;
import net.sf.xenqtt.client.MqttClientListener;
import net.sf.xenqtt.client.PublishMessage;
import net.sf.xenqtt.client.Subscription;
import net.sf.xenqtt.client.SyncMqttClient;
import net.sf.xenqtt.message.ConnectReturnCode;
import net.sf.xenqtt.message.QoS;

import org.apache.log4j.Logger;

/**
 * Builds music catalogs from years gone by.
 */
public class MusicSubscriber {

	private static final Logger log = Logger.getLogger(MusicSubscriber.class);

	public static void main(String... args) throws Throwable {
		final List&lt;String&gt; catalog = Collections.synchronizedList(new ArrayList&lt;String&gt;());
		MqttClientListener listener = new MqttClientListener() {

			@Override
			public void publishReceived(MqttClient client, PublishMessage message) {
				catalog.add(message.getPayloadString());
				message.ack();
			}

			@Override
			public void disconnected(MqttClient client, Throwable cause, boolean reconnecting) {
				if (cause != null) {
					log.error("Disconnected from the broker due to an exception.", cause);
				} else {
					log.info("Disconnecting from the broker.");
				}

				if (reconnecting) {
					log.info("Attempting to reconnect to the broker.");
				}
			}

		};

		// Build your client. This client is a synchronous one so all interaction with the broker will block until said interaction completes.
		SyncMqttClient client = new SyncMqttClient("tcp://mqtt.broker:1883", listener, 5);
		try {
			// Connect to the broker with a specific client ID. Only if the broker accepted the connection shall we proceed.
			ConnectReturnCode returnCode = client.connect("musicLover", true);
			if (returnCode != ConnectReturnCode.ACCEPTED) {
				log.error("Unable to connect to the MQTT broker. Reason: " + returnCode);
				return;
			}

			// Create your subscriptions. In this case we want to build up a catalog of classic rock.
			List&lt;Subscription&gt; subscriptions = new ArrayList&lt;Subscription&gt;();
			subscriptions.add(new Subscription("grand/funk/railroad", QoS.AT_MOST_ONCE));
			subscriptions.add(new Subscription("jefferson/airplane", QoS.AT_MOST_ONCE));
			subscriptions.add(new Subscription("seventies/prog/#", QoS.AT_MOST_ONCE));
			client.subscribe(subscriptions);

			// Build up your catalog. After a while you've waited long enough so move on.
			try {
				Thread.sleep(30000);
			} catch (InterruptedException ignore) {
			}

			// Report on what we have found.
			for (String record : catalog) {
				log.debug("Got a record: " + record);
			}

			// We are done. Unsubscribe from further updates.
			List&lt;String&gt; topics = new ArrayList&lt;String&gt;();
			for (Subscription subscription : subscriptions) {
				topics.add(subscription.getTopic());
			}
			client.unsubscribe(topics);
		} catch (Exception ex) {
			log.error("An unexpected exception has occurred.", ex);
		} finally {
			if (!client.isClosed()) {
				client.disconnect();
			}
		}
	}

}
						</pre>
						<p class="code-header">Synchronous Publisher</p>
						<pre class="prettyprint code">
import net.sf.xenqtt.client.MqttClient;
import net.sf.xenqtt.client.MqttClientListener;
import net.sf.xenqtt.client.PublishMessage;
import net.sf.xenqtt.client.SyncMqttClient;
import net.sf.xenqtt.message.ConnectReturnCode;
import net.sf.xenqtt.message.QoS;

import org.apache.log4j.Logger;

/**
 * Produces hit music from days gone by.
 */
public class MusicProducer {

	private static final Logger log = Logger.getLogger(MusicProducer.class);

	public static void main(String... args) throws Throwable {
		MqttClientListener listener = new MqttClientListener() {

			@Override
			public void publishReceived(MqttClient client, PublishMessage message) {
				log.warn("Received a message when no subscriptions were active. Check your broker ;)");
			}

			@Override
			public void disconnected(MqttClient client, Throwable cause, boolean reconnecting) {
				if (cause != null) {
					log.error("Disconnected from the broker due to an exception.", cause);
				} else {
					log.info("Disconnected from the broker.");
				}

				if (reconnecting) {
					log.info("Attempting to reconnect to the broker.");
				}
			}
		};

		// Build your client. This client is a synchronous one so all interaction with the broker will block until said interaction completes.
		MqttClient client = new SyncMqttClient("tcp://mqtt-broker:1883", listener, 5);
		try {
			ConnectReturnCode returnCode = client.connect("musicProducer", false, "music-user", "music-pass");
			if (returnCode != ConnectReturnCode.ACCEPTED) {
				log.error("Unable to connect to the broker. Reason: " + returnCode);
				return;
			}

			// Publish a musical catalog
			client.publish(new PublishMessage("grand/funk/railroad", QoS.AT_MOST_ONCE, "On Time"));
			client.publish(new PublishMessage("grand/funk/railroad", QoS.AT_MOST_ONCE, "E Pluribus Funk"));
			client.publish(new PublishMessage("jefferson/airplane", QoS.AT_MOST_ONCE, "Surrealistic Pillow"));
			client.publish(new PublishMessage("jefferson/airplane", QoS.AT_MOST_ONCE, "Crown of Creation"));
			client.publish(new PublishMessage("seventies/prog/rush", QoS.AT_MOST_ONCE, "2112"));
			client.publish(new PublishMessage("seventies/prog/rush", QoS.AT_MOST_ONCE, "A Farewell to Kings"));
			client.publish(new PublishMessage("seventies/prog/rush", QoS.AT_MOST_ONCE, "Hemispheres"));
		} catch (Exception ex) {
			log.error("An exception prevented the publishing of the full catalog.", ex);
		} finally {
			// We are done. Disconnect.
			if (!client.isClosed()) {
				client.disconnect();
			}
		}
	}

}
						</pre>
						<strong>Asynchronous Client Example</strong>
						<p class="code-header">Asynchronous Subscriber</p>
						<pre class="prettyprint code">
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import net.sf.xenqtt.client.AsyncClientListener;
import net.sf.xenqtt.client.AsyncMqttClient;
import net.sf.xenqtt.client.MqttClient;
import net.sf.xenqtt.client.PublishMessage;
import net.sf.xenqtt.client.Subscription;
import net.sf.xenqtt.message.ConnectReturnCode;
import net.sf.xenqtt.message.QoS;

import org.apache.log4j.Logger;

/**
 * Builds music catalogs from years gone by.
 */
public class MusicSubscriberAsync {

	private static final Logger log = Logger.getLogger(MusicSubscriberAsync.class);

	public static void main(String... args) throws Throwable {
		final CountDownLatch connectLatch = new CountDownLatch(1);
		final AtomicReference&lt;ConnectReturnCode&gt; connectReturnCode = new AtomicReference&lt;ConnectReturnCode&gt;();
		final List&lt;String&gt; catalog = Collections.synchronizedList(new ArrayList&lt;String&gt;());
		AsyncClientListener listener = new AsyncClientListener() {

			@Override
			public void publishReceived(MqttClient client, PublishMessage message) {
				catalog.add(message.getPayloadString());
				message.ack();
			}

			@Override
			public void disconnected(MqttClient client, Throwable cause, boolean reconnecting) {
				if (cause != null) {
					log.error("Disconnected from the broker due to an exception.", cause);
				} else {
					log.info("Disconnecting from the broker.");
				}

				if (reconnecting) {
					log.info("Attempting to reconnect to the broker.");
				}
			}

			@Override
			public void connected(MqttClient client, ConnectReturnCode returnCode) {
				connectReturnCode.set(returnCode);
				connectLatch.countDown();
			}

			@Override
			public void published(MqttClient client, PublishMessage message) {
				// We do not publish so this should never be called, in theory ;).
			}

			@Override
			public void subscribed(MqttClient client, Subscription[] requestedSubscriptions, Subscription[] grantedSubscriptions, boolean requestsGranted) {
				if (!requestsGranted) {
					log.error("Unable to subscribe to the following subscriptions: " + Arrays.toString(requestedSubscriptions));
				}

				log.debug("Granted subscriptions: " + Arrays.toString(grantedSubscriptions));
			}

			@Override
			public void unsubscribed(MqttClient client, String[] topics) {
				log.debug("Unsubscribed from the following topics: " + Arrays.toString(topics));
			}

		};

		// Build your client. This client is an asynchronous one so all interaction with the broker will be non-blocking.
		AsyncMqttClient client = new AsyncMqttClient("tcp://mqtt-broker:1883", listener, 5);
		try {
			// Connect to the broker with a specific client ID. Only if the broker accepted the connection shall we proceed.
			client.connect("musicLover", true);
			ConnectReturnCode returnCode = connectReturnCode.get();
			if (returnCode == null || returnCode != ConnectReturnCode.ACCEPTED) {
				log.error("Unable to connect to the MQTT broker. Reason: " + returnCode);
				return;
			}

			// Create your subscriptions. In this case we want to build up a catalog of classic rock.
			List&lt;Subscription&gt; subscriptions = new ArrayList&lt;Subscription&gt;();
			subscriptions.add(new Subscription("grand/funk/railroad", QoS.AT_MOST_ONCE));
			subscriptions.add(new Subscription("jefferson/airplane", QoS.AT_MOST_ONCE));
			subscriptions.add(new Subscription("seventies/prog/#", QoS.AT_MOST_ONCE));
			client.subscribe(subscriptions);

			// Build up your catalog. After a while you've waited long enough so move on.
			try {
				Thread.sleep(30000);
			} catch (InterruptedException ignore) {
			}

			// Report on what we have found.
			for (String record : catalog) {
				log.debug("Got a record: " + record);
			}

			// We are done. Unsubscribe at this time.
			List&lt;String&gt; topics = new ArrayList&lt;String&gt;();
			for (Subscription subscription : subscriptions) {
				topics.add(subscription.getTopic());
			}
			client.unsubscribe(topics);
		} catch (Exception ex) {
			log.error("An unexpected exception has occurred.", ex);
		} finally {
			if (!client.isClosed()) {
				client.disconnect();
			}
		}
	}

}
						</pre>
						<p class="code-header">Asynchronous Publisher</p>
						<pre class="prettyprint code">
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import net.sf.xenqtt.client.AsyncClientListener;
import net.sf.xenqtt.client.AsyncMqttClient;
import net.sf.xenqtt.client.MqttClient;
import net.sf.xenqtt.client.PublishMessage;
import net.sf.xenqtt.client.Subscription;
import net.sf.xenqtt.message.ConnectReturnCode;
import net.sf.xenqtt.message.QoS;

import org.apache.log4j.Logger;

/**
 * Produces hit music from days gone by.
 */
public class MusicProducerAsync {

	private static final Logger log = Logger.getLogger(MusicProducerAsync.class);

	public static void main(String... args) throws Throwable {
		final CountDownLatch connectLatch = new CountDownLatch(1);
		final AtomicReference&lt;ConnectReturnCode&gt; connectReturnCode = new AtomicReference&lt;ConnectReturnCode&gt;();
		AsyncClientListener listener = new AsyncClientListener() {

			@Override
			public void publishReceived(MqttClient client, PublishMessage message) {
				log.warn("Received a message when no subscriptions were active. Check your broker ;)");
			}

			@Override
			public void disconnected(MqttClient client, Throwable cause, boolean reconnecting) {
				if (cause != null) {
					log.error("Disconnected from the broker due to an exception.", cause);
				} else {
					log.info("Disconnected from the broker.");
				}

				if (reconnecting) {
					log.info("Attempting to reconnect to the broker.");
				}
			}

			@Override
			public void connected(MqttClient client, ConnectReturnCode returnCode) {
				connectReturnCode.set(returnCode);
				connectLatch.countDown();
			}

			@Override
			public void subscribed(MqttClient client, Subscription[] requestedSubscriptions, Subscription[] grantedSubscriptions, boolean requestsGranted) {
			}

			@Override
			public void unsubscribed(MqttClient client, String[] topics) {
			}

			@Override
			public void published(MqttClient client, PublishMessage message) {
			}

		};

		// Build your client. This client is an asynchronous one so all interaction with the broker will be non-blocking.
		MqttClient client = new AsyncMqttClient("tcp://mqtt-broker:1883", listener, 5);
		try {
			// Connect to the broker. We will await the return code so that we know whether or not we can even begin publishing.
			client.connect("musicProducerAsync", false, "music-user", "music-pass");
			connectLatch.await();

			ConnectReturnCode returnCode = connectReturnCode.get();
			if (returnCode == null || returnCode != ConnectReturnCode.ACCEPTED) {
				// The broker bounced us. We are done.
				log.error("The broker rejected our attempt to connect. Reason: " + returnCode);
				return;
			}

			// Publish a musical catalog
			client.publish(new PublishMessage("grand/funk/railroad", QoS.AT_MOST_ONCE, "On Time"));
			client.publish(new PublishMessage("grand/funk/railroad", QoS.AT_MOST_ONCE, "E Pluribus Funk"));
			client.publish(new PublishMessage("jefferson/airplane", QoS.AT_MOST_ONCE, "Surrealistic Pillow"));
			client.publish(new PublishMessage("jefferson/airplane", QoS.AT_MOST_ONCE, "Crown of Creation"));
			client.publish(new PublishMessage("seventies/prog/rush", QoS.AT_MOST_ONCE, "2112"));
			client.publish(new PublishMessage("seventies/prog/rush", QoS.AT_MOST_ONCE, "A Farewell to Kings"));
			client.publish(new PublishMessage("seventies/prog/rush", QoS.AT_MOST_ONCE, "Hemispheres"));
		} catch (Exception ex) {
			log.error("An exception prevented the publishing of the full catalog.", ex);
		} finally {
			// We are done. Disconnect.
			if (!client.isClosed()) {
				client.disconnect();
			}
		}
	}

}
						</pre>
					</div>
					<div class="post" id="mockbrokerapi">
						<h3>Using the Mock Broker API</h3>
						<p>All examples are included in the xenqtt jar in the <code>net.sf.xenqtt.examples</code> package.</p>
						<p class="code-header">Mock Broker - Vanilla</p>
						The following example shows how to fire up an instance of the Mock Broker within a running JVM. The mock broker that launches uses the
						default setup and configuration. No special handlers or eventing rules are employed.
						<pre class="prettyprint code">
package net.sf.xenqtt.examples;

import net.sf.xenqtt.mockbroker.MockBroker;
import net.sf.xenqtt.mockbroker.MockBrokerHandler;

/**
 * Fires up a mock broker that specializes in routing data of the 'Glam' variety.
 */
public class GlamBroker {

	public static void main(String... args) throws InterruptedException {
		MockBrokerHandler handler = new MockBrokerHandler();
		MockBroker broker = new MockBroker(handler);

		broker.init(); // Blocks until startup is complete.

		// At this point the broker is online. Clients can connect to it, publish messages, subscribe, etc.
		Thread.sleep(60000);

		// We are done. Shutdown the broker. Wait forever (> 0 means wait that many milliseconds).
		broker.shutdown(0);
	}

}
						</pre>
						<p class="code-header">Mock Broker - Custom Handler</p>
						The following example shows how to fire up a mock broker instance in the local JVM. The mock broker is given a custom event handler.
						This handler is invoked as events move through the broker. Such handlers can be used to customize the behavior of the broker and prove
						extremely useful in testing.
						<pre class="prettyprint code">
package net.sf.xenqtt.examples;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

import net.sf.xenqtt.message.PubMessage;
import net.sf.xenqtt.message.QoS;
import net.sf.xenqtt.message.SubscribeMessage;
import net.sf.xenqtt.mockbroker.Client;
import net.sf.xenqtt.mockbroker.MockBroker;
import net.sf.xenqtt.mockbroker.MockBrokerHandler;

/**
 * Fires up a mock broker that specializes in routing data of the 'Glam' variety. This particular broker has a special handler that rejects any and all attempts
 * to interact with country music.
 */
public class GlamNoCountryBroker {

	public static void main(String... args) throws InterruptedException {
		MockBrokerHandler handler = new GlamBrokerHandler();
		MockBroker broker = new MockBroker(handler);

		broker.init(); // Blocks until startup is complete.

		// At this point the broker is online. Clients can connect to it, publish messages, subscribe, etc.
		Thread.sleep(60000);

		// We are done. Shutdown the broker. Wait forever (> 0 means wait that many milliseconds).
		broker.shutdown(0);
	}

	private static final class GlamBrokerHandler extends MockBrokerHandler {

		@Override
		public boolean publish(Client client, PubMessage message) throws Exception {
			String payload = new String(message.getPayload(), Charset.forName("UTF-8"));
			if (payload.indexOf("Country Music") > -1) {
				// We don't do that stuff here! Return true to suppress processing of the message
				return true;
			}

			return super.publish(client, message);
		}

		/**
		 * @see net.sf.xenqtt.mockbroker.MockBrokerHandler#subscribe(net.sf.xenqtt.mockbroker.Client, net.sf.xenqtt.message.SubscribeMessage)
		 */
		@Override
		public boolean subscribe(Client client, SubscribeMessage message) throws Exception {
			String[] topics = message.getTopics();
			QoS[] qoses = message.getRequestedQoSes();
			List<String> allowedTopics = new ArrayList<String>();
			List<QoS> allowedQoses = new ArrayList<QoS>();
			int index = 0;
			for (String topic : topics) {
				// Only allow topic subscriptions for topics that don't include country music.
				if (!topic.matches("^.*(?i:country).*$")) {
					allowedTopics.add(topic);
					allowedQoses.add(qoses[index]);
				}

				index++;
			}

			message = new SubscribeMessage(message.getMessageId(), allowedTopics.toArray(new String[0]), allowedQoses.toArray(new QoS[0]));

			return super.subscribe(client, message);
		}

	}

}
						</pre>
						<p>Please see the JavaDoc for a complete list of the methods that can be overridden on the <code>MockBrokerHandler</code>.</p>
						<p class="code-header">Mock Broker - Event Reporting</p>
						The following example shows how to fire up the mock broker in the local JVM. In addition to a custom handler the example shows how to
						interrogate the broker for certain events that pass through it. The information from these events can be used for many disparate
						purposes. These are typically application-specific.
						<pre class="prettyprint code">
package net.sf.xenqtt.examples;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import net.sf.xenqtt.message.MessageType;
import net.sf.xenqtt.message.PubMessage;
import net.sf.xenqtt.message.QoS;
import net.sf.xenqtt.message.SubscribeMessage;
import net.sf.xenqtt.mockbroker.BrokerEvent;
import net.sf.xenqtt.mockbroker.BrokerEventType;
import net.sf.xenqtt.mockbroker.Client;
import net.sf.xenqtt.mockbroker.MockBroker;
import net.sf.xenqtt.mockbroker.MockBrokerHandler;

/**
 * Fires up a mock broker that specializes in routing data of the 'Glam' variety. This particular broker has a special handler that rejects any and all attempts
 * to interact with country music. In addition, it allows for retrieval of events from the broker and their subsequent interrogation.
 */
public class GlamEventReportingBroker {

	public static void main(String... args) throws InterruptedException {
		MockBrokerHandler handler = new GlamBrokerHandler();
		MockBroker broker = new MockBroker(handler);

		broker.init(); // Blocks until startup is complete.

		// At this point the broker is online. Clients can connect to it, publish messages, subscribe, etc.
		Thread.sleep(60000);

		// Examine broker events and report some useful stats about them.
		List<BrokerEvent> events = broker.getEvents();
		Set<String> clientIds = new HashSet<String>();
		Map<BrokerEventType, Integer> eventTypes = new EnumMap<BrokerEventType, Integer>(BrokerEventType.class);
		Map<MessageType, Integer> messageTypes = new EnumMap<MessageType, Integer>(MessageType.class);
		for (BrokerEvent event : events) {
			clientIds.add(event.getClientId());

			BrokerEventType brokerEventType = event.getEventType();
			Integer brokerEventCount = eventTypes.get(brokerEventType);
			if (brokerEventCount == null) {
				brokerEventCount = Integer.valueOf(0);
			}
			eventTypes.put(brokerEventType, Integer.valueOf(brokerEventCount.intValue() + 1));

			MessageType messageType = event.getMessage().getMessageType();
			Integer messageTypeCount = messageTypes.get(messageType);
			if (messageTypeCount == null) {
				messageTypeCount = Integer.valueOf(0);
			}
			messageTypes.put(messageType, Integer.valueOf(messageTypeCount.intValue() + 1));
		}
		System.out.printf("Total events: %d\n", events.size());
		System.out.printf("Total client IDs: %d\n", clientIds.size());

		System.out.println("Counts by broker event type:");
		for (Entry<BrokerEventType, Integer> entry : eventTypes.entrySet()) {
			System.out.printf("\t%s: %d\n", entry.getKey().name(), entry.getValue());
		}

		System.out.printf("Counts by MQTT message type:");
		for (Entry<MessageType, Integer> entry : messageTypes.entrySet()) {
			System.out.printf("\t%s: %d\n", entry.getKey().name(), entry.getValue());
		}

		// We are done. Shutdown the broker. Wait forever (> 0 means wait that many milliseconds).
		broker.shutdown(0);
	}

	private static final class GlamBrokerHandler extends MockBrokerHandler {

		@Override
		public boolean publish(Client client, PubMessage message) throws Exception {
			String payload = new String(message.getPayload(), Charset.forName("UTF-8"));
			if (payload.indexOf("Country Music") > -1) {
				// We don't do that stuff here! Return true to suppress processing of the message
				return true;
			}

			return super.publish(client, message);
		}

		/**
		 * @see net.sf.xenqtt.mockbroker.MockBrokerHandler#subscribe(net.sf.xenqtt.mockbroker.Client, net.sf.xenqtt.message.SubscribeMessage)
		 */
		@Override
		public boolean subscribe(Client client, SubscribeMessage message) throws Exception {
			String[] topics = message.getTopics();
			QoS[] qoses = message.getRequestedQoSes();
			List<String> allowedTopics = new ArrayList<String>();
			List<QoS> allowedQoses = new ArrayList<QoS>();
			int index = 0;
			for (String topic : topics) {
				// Only allow topic subscriptions for topics that don't include country music.
				if (!topic.matches("^.*(?i:country).*$")) {
					allowedTopics.add(topic);
					allowedQoses.add(qoses[index]);
				}

				index++;
			}

			message = new SubscribeMessage(message.getMessageId(), allowedTopics.toArray(new String[0]), allowedQoses.toArray(new QoS[0]));

			return super.subscribe(client, message);
		}

	}

}
						</pre>
					</div>
					<div class="post post-last" id="links">
						<h3>Related Resources</h3>
						<a href="http://mqtt.org">MQTT Overview</a><img src="images/ext-link.gif" height="10" width="10" />
					</div>
				</div>
			</div>
		</div>
		<div id="footer">
			&copy; 2013 J2 Enterprises | Design by <a href="http://www.freecsstemplates.org/" rel="nofollow">FreeCSSTemplates.org</a> | Images by <a href="http://fotogrph.com/">Fotogrph</a>
		</div>
	</body>
</html>